45. Python自定义函数

为什么要学习自定义函数?

  • 抽象化:将复杂实现隐藏在简洁接口之后
  • 模块化:大问题分解为小函数,单一职责
  • 复用性:一次编写,多处调用,减少出错
  • 可读性:函数名就是最好的文档

函数在金融分析中的意义

  • 策略封装:交易策略参数化配置
  • 指标计算:MA、MACD、RSI 等封装为函数
  • 风险管理:VaR、最大回撤等函数化
  • 回测框架:整个流程就是一系列函数的组合

函数定义的基本语法

使用 def 关键字定义函数:

def 函数名(参数列表):
    '''文档字符串'''
    函数体
    return 返回值
  • def:定义函数的关键字
  • 函数名:用 snake_case 命名,动词开头
  • 文档字符串:说明函数用途、参数和返回值
  • return:返回计算结果

函数定义与调用示例

Listing 1
def greet(name):
    """
    问候函数:向指定名字的人问好

    参数:
        name (str): 要问候的人名

    返回:
        str: 包含问候语的字符串
    """
    return f'您好, {name}!'

# 调用函数
# 函数名后跟括号,括号内传入实际参数
message = greet('李四')

# 打印返回值
print(message)  # 输出: 您好, 李四!

# 可以在表达式中直接使用函数返回值
print(greet('张三'))  # 输出: 您好, 张三!
您好, 李四!
您好, 张三!

函数调用机制

Listing 2
def demonstrate_call_process(x, y):
    """
    演示函数调用的完整过程
    """
    print(f'接收到参数: x={x}, y={y}')

    # 执行函数逻辑
    result = x + y

    print(f'计算结果: {result}')

    # 返回结果
    return result

# 函数调用过程分解
# 1. 创建栈帧(Stack Frame):为函数调用分配内存空间
# 2. 参数传递:将实参值绑定到形参名
# 3. 执行函数体:按顺序执行函数内语句
# 4. 返回值:将结果返回给调用者
# 5. 销毁栈帧:释放函数调用占用的内存

a, b = 10, 20
print('准备调用函数...')

# 函数调用:程序跳转到函数体执行
value = demonstrate_call_process(a, b)

print(f'函数返回值: {value}')
print('函数调用结束,继续执行主程序')
准备调用函数...
接收到参数: x=10, y=20
计算结果: 30
函数返回值: 30
函数调用结束,继续执行主程序

空函数与占位符

开发时需要先定义函数框架,稍后填充实现:

Listing 3
def placeholder_function():
    """
    这是一个占位函数,稍后实现

    使用pass语句避免语法错误
    """
    pass  # pass是Python的空操作语句

# 调用空函数不会产生任何效果
placeholder_function()

# 另一种常见的占位写法:使用省略号(...)
def another_placeholder():
    """另一个占位函数"""
    ...  # 省略号在Python中也是有效的表达式

another_placeholder()  # 不会报错

位置参数

最基础的参数类型,按照定义时的顺序传递:

Listing 4
def calculate_profit(revenue, cost):
    """
    计算利润的函数

    参数:
        revenue (float): 营业收入
        cost (float): 营业成本

    返回:
        float: 利润额
    """
    profit = revenue - cost
    return profit

# 调用时必须按照定义的顺序传递参数
profit1 = calculate_profit(1000, 600)
print(f'利润1: {profit1}')  # 输出: 利润1: 400

# 如果顺序错误,会得到错误结果
profit2 = calculate_profit(600, 1000)
print(f'利润2: {profit2}')  # 输出: 利润2: -400 (错误!)
利润1: 400
利润2: -400

关键字参数

通过参数名传递,不受顺序限制,代码更清晰:

Listing 5
def calculate_profit(revenue, cost):
    """计算利润"""
    return revenue - cost

# 使用关键字参数,顺序可以任意
profit1 = calculate_profit(revenue=1000, cost=600)
profit2 = calculate_profit(cost=600, revenue=1000)

print(f'利润1: {profit1}')  # 输出: 利润1: 400
print(f'利润2: {profit2}')  # 输出: 利润2: 400 (正确!)

# 关键字参数提高了代码的可读性
# 尤其当函数有多个参数时,不容易混淆
利润1: 400
利润2: 400

默认参数

为参数提供默认值,使函数调用更灵活:

Listing 6
def calculate_tax(salary, rate=0.05, insurance_rate=0.02):
    """
    计算税后工资

    参数:
        salary (float): 税前工资
        rate (float): 税率,默认5%
        insurance_rate (float): 社保扣除比例,默认2%

    返回:
        float: 税后工资
    """
    # 计算税金
    tax = salary * rate

    # 计算社保扣除
    insurance = salary * insurance_rate

    # 计算实发工资
    net_salary = salary - tax - insurance

    return net_salary

# 情况1:只提供必填参数
net1 = calculate_tax(5000)
print(f'税后工资1(默认税率): {net1:.2f}元')

# 情况2:覆盖默认税率
net2 = calculate_tax(5000, rate=0.10)
print(f'税后工资2(10%税率): {net2:.2f}元')

# 情况3:使用关键字参数指定特定参数
net3 = calculate_tax(5000, insurance_rate=0.05)
print(f'税后工资3(5%社保): {net3:.2f}元')

# 情况4:所有参数都指定
net4 = calculate_tax(5000, rate=0.08, insurance_rate=0.03)
print(f'税后工资4(全部指定): {net4:.2f}元')
税后工资1(默认税率): 4650.00元
税后工资2(10%税率): 4400.00元
税后工资3(5%社保): 4500.00元
税后工资4(全部指定): 4450.00元

可变默认参数陷阱

使用列表等可变对象作为默认参数时要特别小心:

Listing 7
# 错误示例:使用列表作为默认参数
def append_item(item, items=[]):  # 危险!
    """向列表添加项目"""
    items.append(item)
    return items

# 第一次调用
result1 = append_item('A')
print(f'第一次: {result1}')  # 输出: ['A']

# 第二次调用
result2 = append_item('B')
print(f'第二次: {result2}')  # 输出: ['A', 'B'] 意外!

# 问题:默认列表在函数定义时创建,所有调用共享同一个列表

# 正确做法:使用None作为默认值
def append_item_correct(item, items=None):
    """向列表添加项目(正确版本)"""
    if items is None:  # 每次调用都创建新列表
        items = []
    items.append(item)
    return items

# 测试正确版本
result3 = append_item_correct('A')
print(f'正确第一次: {result3}')  # 输出: ['A']

result4 = append_item_correct('B')
print(f'正确第二次: {result4}')  # 输出: ['B'] 符合预期!
第一次: ['A']
第二次: ['A', 'B']
正确第一次: ['A']
正确第二次: ['B']

可变位置参数 *args

接收任意数量的位置参数:

Listing 8
def sum_all(*args):
    """
    计算所有传入数字的和

    参数:
        *args: 可变数量的位置参数

    返回:
        float/int: 所有参数的和
    """
    print(f'接收到 {len(args)} 个参数: {args}')

    # args是一个元组,包含所有位置参数
    total = sum(args)

    return total

# 调用示例
result1 = sum_all(1, 2, 3)
print(f'和1: {result1}')  # 输出: 和1: 6

result2 = sum_all(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
print(f'和2: {result2}')  # 输出: 和2: 55

# 也可以传递列表或元组,使用*解包
numbers = [10, 20, 30, 40]
result3 = sum_all(*numbers)  # *将列表解包为独立参数
print(f'和3: {result3}')  # 输出: 和3: 100
接收到 3 个参数: (1, 2, 3)
和1: 6
接收到 10 个参数: (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
和2: 55
接收到 4 个参数: (10, 20, 30, 40)
和3: 100

可变关键字参数 **kwargs

接收任意数量的关键字参数:

Listing 9
def create_portfolio(**kwargs):
    """
    创建投资组合

    参数:
        **kwargs: 股票代码和持仓数量的键值对

    返回:
        dict: 投资组合字典
    """
    print('构建投资组合...')

    # kwargs是一个字典,包含所有关键字参数
    for symbol, shares in kwargs.items():
        print(f'  {symbol}: {shares}股')

    return kwargs

# 调用示例
portfolio1 = create_portfolio(AAPL=100, MSFT=50, GOOGL=30)
print(f'组合1: {portfolio1}')

# 也可以传递字典,使用**解包
stocks = {'600519.SH': 500, '000858.SZ': 1000}
portfolio2 = create_portfolio(**stocks)  # **将字典解包为关键字参数
print(f'组合2: {portfolio2}')
构建投资组合...
  AAPL: 100股
  MSFT: 50股
  GOOGL: 30股
组合1: {'AAPL': 100, 'MSFT': 50, 'GOOGL': 30}
构建投资组合...
  600519.SH: 500股
  000858.SZ: 1000股
组合2: {'600519.SH': 500, '000858.SZ': 1000}

参数组合规则

多种参数类型同时使用时,必须遵循顺序:

Listing 10
def complex_function(a, b, c=10, *args, **kwargs):
    """
    演示各种参数的组合使用

    参数顺序规则:
    1. 位置参数 (a, b)
    2. 默认参数 (c=10)
    3. 可变位置参数 (*args)
    4. 可变关键字参数 (**kwargs)
    """
    print(f'a={a}, b={b}, c={c}')
    print(f'args: {args}')
    print(f'kwargs: {kwargs}')

# 调用示例
complex_function(1, 2, 3, 4, 5, x=100, y=200)

# 输出:
# a=1, b=2, c=3
# args: (4, 5)
# kwargs: {'x': 100, 'y': 200}
a=1, b=2, c=3
args: (4, 5)
kwargs: {'x': 100, 'y': 200}

单一返回值

Listing 11
def calculate_return(initial, final):
    """
    计算简单收益率

    参数:
        initial (float): 初始价格
        final (float): 最终价格

    返回:
        float: 收益率(小数形式)
    """
    return (final - initial) / initial

# 使用示例
initial_price = 100
final_price = 115

return_rate = calculate_return(initial_price, final_price)
print(f'收益率: {return_rate:.2%}')  # 输出: 收益率: 15.00%
收益率: 15.00%

多返回值

Python 函数可以返回多个值(实际是元组):

Listing 12
def calculate_return_metrics(prices):
    """
    计算收益率、最大回撤和波动率

    参数:
        prices (list): 价格序列

    返回:
        tuple: (收益率, 最大回撤, 波动率)
    """
    # 计算收益率
    initial_price = prices[0]
    final_price = prices[-1]
    return_rate = (final_price - initial_price) / initial_price

    # 计算最大回撤
    max_price = max(prices)
    max_drawdown = (max_price - min(prices)) / max_price

    # 计算波动率(简化版)
    import statistics
    volatility = statistics.stdev(prices) / statistics.mean(prices)

    # 返回多个值,实际是返回一个元组
    return return_rate, max_drawdown, volatility

# 调用函数
prices = [100, 105, 102, 108, 112, 110, 115]

# 方式1:接收整个元组
metrics = calculate_return_metrics(prices)
print(f'所有指标(元组): {metrics}')

# 方式2:解包到多个变量
ret, dd, vol = calculate_return_metrics(prices)
print(f'收益率: {ret:.2%}')
print(f'最大回撤: {dd:.2%}')
print(f'波动率: {vol:.2%}')

# 方式3:只接收部分值,使用_忽略不需要的
ret_only, _, _ = calculate_return_metrics(prices)
print(f'只要收益率: {ret_only:.2%}')
所有指标(元组): (0.15, 0.13043478260869565, 0.05037419394913779)
收益率: 15.00%
最大回撤: 13.04%
波动率: 5.04%
只要收益率: 15.00%

提前返回(Early Return)

函数可以在多个地方返回,处理边界条件:

Listing 13
def calculate_yearly_return(prices):
    """
    计算年化收益率,包含错误处理

    参数:
        prices (list): 价格序列

    返回:
        float: 年化收益率,如果出错返回None
    """
    # 边界条件1:数据不足
    if len(prices) < 2:
        print('错误:至少需要2个价格点')
        return None  # 提前返回

    # 边界条件2:价格为0或负数
    if prices[0] <= 0:
        print('错误:初始价格必须为正数')
        return None  # 提前返回

    # 正常逻辑
    initial = prices[0]
    final = prices[-1]

    # 计算年化收益率(假设252个交易日)
    trading_days = len(prices)
    yearly_return = (final / initial) ** (252 / trading_days) - 1

    return yearly_return  # 正常返回

# 测试用例
test1 = calculate_yearly_return([100, 105, 110])
print(f'测试1(正常): {test1:.2%}' if test1 else test1)

test2 = calculate_yearly_return([100])  # 数据不足
print(f'测试2(数据不足): {test2}')

test3 = calculate_yearly_return([0, 100])  # 初始价格为0
print(f'测试3(价格为0): {test3}')
测试1(正常): 299806.28%
错误:至少需要2个价格点
测试2(数据不足): None
错误:初始价格必须为正数
测试3(价格为0): None

无返回值与 None

没有 return 语句的函数返回 None

Listing 14
def log_message(message):
    """记录日志消息"""
    print(f'[LOG] {message}')
    # 没有return语句,返回None

result = log_message('系统启动')
print(f'返回值: {result}')  # 输出: 返回值: None
print(f'返回值类型: {type(result)}')  # 输出: <class 'NoneType'>

def explicit_none():
    """显式返回None"""
    print('执行完毕')
    return None  # 显式返回None

result2 = explicit_none()
print(f'返回值2: {result2}')  # 输出: 返回值2: None
[LOG] 系统启动
返回值: None
返回值类型: <class 'NoneType'>
执行完毕
返回值2: None

递归函数:什么是递归?

递归是函数调用自身的编程技巧。正确的递归必须包含:

  • 基准情况(Base Case):递归终止的条件
  • 递归情况(Recursive Case):将问题分解为更小的子问题

递归示例:计算阶乘

Listing 15
def factorial(n):
    """
    计算阶乘:n! = n × (n-1) × ... × 2 × 1

    参数:
        n (int): 非负整数

    返回:
        int: n的阶乘

    递归关系:
        - 0! = 1 (基准情况)
        - n! = n × (n-1)! (递归情况)
    """
    # 基准情况:递归的终止条件
    if n <= 1:
        return 1

    # 递归情况:将问题分解
    # n! = n × (n-1)!
    return n * factorial(n - 1)

# 使用示例
print(f'0! = {factorial(0)}')  # 输出: 0! = 1
print(f'5! = {factorial(5)}')  # 输出: 5! = 120
print(f'10! = {factorial(10)}')  # 输出: 10! = 3628800

# 递归执行过程演示(以factorial(3)为例)
# factorial(3)
# → 3 * factorial(2)
#     → 2 * factorial(1)
#         → 1 (基准情况)
#     = 2 * 1 = 2
# = 3 * 2 = 6
0! = 1
5! = 120
10! = 3628800

递归在金融中的应用:折现现金流

Listing 16
def present_value(cash_flows, discount_rate, period=0):
    """
    使用递归计算现金流的现值

    参数:
        cash_flows (list): 现金流列表
        discount_rate (float): 折现率
        period (int): 当前期数(内部使用)

    返回:
        float: 所有现金流的现值

    递归关系:
        PV = CF₀ + CF₁/(1+r) + CF₂/(1+r)² + ...
    """
    # 基准情况:没有更多现金流
    if period >= len(cash_flows):
        return 0.0

    # 获取当前期现金流
    cf = cash_flows[period]

    # 计算当前现金流的现值
    discount_factor = (1 + discount_rate) ** period
    pv = cf / discount_factor

    # 递归计算剩余现金流的现值
    remaining_pv = present_value(cash_flows, discount_rate, period + 1)

    # 返回总现值
    return pv + remaining_pv

# 示例:3年期债券
cash_flows = [50, 50, 1050]  # 前两年每年50利息,第三年本金+利息
rate = 0.05  # 5%折现率

pv = present_value(cash_flows, rate)
print(f'现值: {pv:.2f}元')

# 递归调用过程:
# present_value([50, 50, 1050], 0.05, 0)
# → 50/(1.05)⁰ + present_value([50, 50, 1050], 0.05, 1)
#     → 50/(1.05)¹ + present_value([50, 50, 1050], 0.05, 2)
#         → 1050/(1.05)² + present_value([50, 50, 1050], 0.05, 3)
#             → 0 (基准情况)
# = 1000.0
现值: 1050.00元

递归的深度限制

Listing 17
import sys

# 查看默认递归深度限制
print(f'递归深度限制: {sys.getrecursionlimit()}')

# 尝试超过限制会导致错误
def deep_recursion(n):
    """深度递归演示"""
    if n <= 0:
        return 0
    return n + deep_recursion(n - 1)

# 这会导致 RecursionError
try:
    result = deep_recursion(2000)
except RecursionError as e:
    print(f'错误:递归深度超限 - {e}')

# 解决方法:使用循环代替递归
def iterative_sum(n):
    """循环版本,无深度限制"""
    total = 0
    for i in range(n + 1):
        total += i
    return total

result = iterative_sum(2000)
print(f'循环版本结果: {result}')
递归深度限制: 3000
循环版本结果: 2001000

变量作用域:LEGB 规则

Python 使用 LEGB 规则查找变量:

层级 名称 含义
L Local 局部作用域(函数内部)
E Enclosing 嵌套函数的外层函数作用域
G Global 全局作用域(模块级别)
B Built-in 内置作用域(Python 内置模块)

变量作用域示例

Listing 18
# 全局变量(G层)
x = 10  # 全局变量x
y = 100  # 全局变量y

def outer_function():
    """外层函数(E层)"""

    # 外层函数的局部变量
    y = 20  # 遮蔽全局变量y

    def inner_function():
        """内层函数(L层)"""

        # 内层函数的局部变量
        z = 5  # 局部变量z

        # 访问各层变量
        print(f'内层局部变量 z: {z}')
        print(f'外层变量 y: {y}')
        print(f'全局变量 x: {x}')

    # 调用内层函数
    inner_function()

# 调用外层函数
outer_function()

# 在全局作用域访问x和y
print(f'全局变量 x: {x}')
print(f'全局变量 y: {y}')

# 尝试访问局部变量会报错
try:
    print(z)
except NameError as e:
    print(f'错误:局部变量在外部不可访问 - {e}')
内层局部变量 z: 5
外层变量 y: 20
全局变量 x: 10
全局变量 x: 10
全局变量 y: 100
错误:局部变量在外部不可访问 - name 'z' is not defined

global 关键字

在函数内部修改全局变量:

Listing 19
# 全局变量
counter = 0
total_sum = 0

def increment_counter():
    """增加计数器"""
    global counter  # 声明使用全局变量
    counter += 1  # 修改全局变量
    print(f'计数器: {counter}')

def add_to_total(value):
    """累加到总和"""
    global total_sum  # 声明使用全局变量
    total_sum += value
    print(f'累加后总和: {total_sum}')

# 测试
print(f'初始counter: {counter}')
increment_counter()  # counter变为1
increment_counter()  # counter变为2
print(f'最终counter: {counter}')

print(f'初始total_sum: {total_sum}')
add_to_total(100)  # total_sum变为100
add_to_total(200)  # total_sum变为300
print(f'最终total_sum: {total_sum}')
初始counter: 0
计数器: 1
计数器: 2
最终counter: 2
初始total_sum: 0
累加后总和: 100
累加后总和: 300
最终total_sum: 300

nonlocal 关键字

嵌套函数中修改外层函数的变量:

Listing 20
def make_accumulator():
    """累加器工厂函数"""

    # 外层函数的局部变量
    total = 0

    def accumulator(value):
        """内层累加函数"""
        nonlocal total  # 声明使用外层变量
        total += value
        return total

    return accumulator  # 返回内层函数

# 创建累加器
acc1 = make_accumulator()
acc2 = make_accumulator()

# 每个累加器有独立的状态
print(f'acc1累加10: {acc1(10)}')  # 输出: 10
print(f'acc1累加20: {acc1(20)}')  # 输出: 30
print(f'acc2累加100: {acc2(100)}')  # 输出: 100 (独立的total)
acc1累加10: 10
acc1累加20: 30
acc2累加100: 100

lambda 表达式:匿名函数

lambda 是创建匿名函数的简洁方式:

Listing 21
# lambda语法:lambda 参数列表: 表达式
# 特点:单个表达式,自动返回结果

# 示例1:简单平方函数
square = lambda x: x**2
print(f'5的平方: {square(5)}')  # 输出: 5的平方: 25

# 示例2:等价于以下普通函数
def square_normal(x):
    return x**2

print(f'6的平方: {square_normal(6)}')  # 输出: 6的平方: 36

# 示例3:多参数lambda
add = lambda x, y: x + y
print(f'3 + 5 = {add(3, 5)}')  # 输出: 3 + 5 = 8

# 示例4:lambda在金融中的应用 - 计算投资净值
net_value = lambda prices, shares: sum(p * s for p, s in zip(prices, shares))

prices = [10.5, 20.3, 15.8]
shares = [100, 200, 150]

value = net_value(prices, shares)
print(f'投资净值: {value:.2f}元')
5的平方: 25
6的平方: 36
3 + 5 = 8
投资净值: 7480.00元

lambda 的典型应用场景

Listing 22
# 场景1:与map()结合 - 对序列中每个元素应用函数
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, numbers))
print(f'平方列表: {squared}')  # 输出: [1, 4, 9, 16, 25]

# 场景2:与filter()结合 - 筛选符合条件的元素
stocks = [
    ('AAPL', 150),
    ('MSFT', 280),
    ('GOOGL', 2700)
]

# 筛选价格大于200的股票
expensive = list(filter(lambda s: s[1] > 200, stocks))
print(f'高价股票: {expensive}')  # 输出: [('MSFT', 280), ('GOOGL', 2700)]

# 场景3:与sorted()结合 - 自定义排序
portfolio = [
    {'symbol': 'AAPL', 'shares': 100},
    {'symbol': 'MSFT', 'shares': 50},
    {'symbol': 'GOOGL', 'shares': 200}
]

# 按持股数量排序
sorted_by_shares = sorted(portfolio, key=lambda x: x['shares'])
print(f'按持股排序:')
for item in sorted_by_shares:
    print(f"  {item['symbol']}: {item['shares']}股")

# 场景4:与max()和min()结合 - 基于复杂条件找极值
# 找出收益率最高的股票
stock_returns = [('AAPL', 0.15), ('MSFT', 0.20), ('GOOGL', 0.12)]
best_stock = max(stock_returns, key=lambda x: x[1])
print(f'最佳股票: {best_stock[0]}, 收益率: {best_stock[1]:.2%}')
平方列表: [1, 4, 9, 16, 25]
高价股票: [('MSFT', 280), ('GOOGL', 2700)]
按持股排序:
  MSFT: 50股
  AAPL: 100股
  GOOGL: 200股
最佳股票: MSFT, 收益率: 20.00%

lambda 的局限性

lambda 只适合简短逻辑,复杂情况用普通函数:

Listing 23
# 不好的lambda示例:过于复杂
# calculate_tax_bad = lambda salary: salary * 0.05 if salary <= 5000 else salary * 0.10

# 更好的做法:使用普通函数
def calculate_tax(salary):
    """
    计算个人所得税

    参数:
        salary (float): 税前工资

    返回:
        float: 应缴税额
    """
    if salary <= 5000:
        return salary * 0.05
    else:
        return salary * 0.10

# 使用
print(f'税额(3000): {calculate_tax(3000):.2f}元')
print(f'税额(8000): {calculate_tax(8000):.2f}元')
税额(3000): 150.00元
税额(8000): 800.00元

高阶函数:map()

map() 对序列中每个元素应用函数:

Listing 24
def calculate_return(prices):
    """计算价格序列的收益率"""
    returns = []
    for i in range(1, len(prices)):
        ret = (prices[i] - prices[i-1]) / prices[i-1]
        returns.append(ret)
    return returns

# 使用map处理多个股票
stocks = {
    'AAPL': [150, 152, 148, 155],
    'MSFT': [280, 285, 282, 290],
    'GOOGL': [2700, 2720, 2680, 2750]
}

# 对每只股票应用收益率计算
returns_dict = dict(map(
    lambda item: (item[0], calculate_return(item[1])),
    stocks.items()
))

print('各股票收益率:')
for symbol, returns in returns_dict.items():
    print(f'{symbol}: {[f"{r:.2%}" for r in returns]}')
各股票收益率:
AAPL: ['1.33%', '-2.63%', '4.73%']
MSFT: ['1.79%', '-1.05%', '2.84%']
GOOGL: ['0.74%', '-1.47%', '2.61%']

高阶函数:filter()

filter() 筛选符合条件的元素:

Listing 25
stocks = [
    {'symbol': 'AAPL', 'price': 150, 'pe': 25},
    {'symbol': 'MSFT', 'price': 280, 'pe': 30},
    {'symbol': 'GOOGL', 'price': 2700, 'pe': 20},
    {'symbol': 'TSLA', 'price': 800, 'pe': 100}
]

# 筛选PE < 30的股票
value_stocks = list(filter(lambda s: s['pe'] < 30, stocks))

print('价值股票(PE < 30):')
for stock in value_stocks:
    print(f"  {stock['symbol']}: PE={stock['pe']}")
价值股票(PE < 30):
  AAPL: PE=25
  GOOGL: PE=20

高阶函数:reduce()

reduce() 对序列做累积计算:

Listing 26
from functools import reduce

def calculate_compound_return(returns):
    """
    计算复合收益率

    参数:
        returns (list): 期收益率列表

    返回:
        float: 复合收益率
    """
    # 复合收益率 = (1+r₁) × (1+r₂) × ... × (1+rₙ) - 1
    # 使用reduce累积计算
    compound = reduce(
        lambda acc, r: acc * (1 + r),  # 累积乘法
        returns,
        1.0  # 初始值
    )
    return compound - 1

# 示例:4个季度的收益率
quarterly_returns = [0.05, 0.03, 0.08, 0.02]

annual_return = calculate_compound_return(quarterly_returns)
print(f'年复合收益率: {annual_return:.2%}')
年复合收益率: 19.14%

函数文档:Google 风格 Docstring

Listing 27
def calculate_sharpe_ratio(returns, risk_free_rate=0.03):
    """
    计算夏普比率(Sharpe Ratio)

    夏普比率是衡量风险调整后收益的指标,计算公式为:
    Sharpe Ratio = (E[R] - Rf) / σ(R)
    其中E[R]是预期收益,Rf是无风险利率,σ(R)是收益标准差

    参数:
        returns (list[float]): 资产收益率序列,应为小数形式(如0.05表示5%)
        risk_free_rate (float, optional): 无风险利率,默认0.03(3%)

    返回:
        float: 夏普比率。值越大表示单位风险的超额收益越高

    异常:
        ValueError: 如果returns为空或包含无效值

    示例:
        >>> returns = [0.05, 0.03, 0.08, -0.02, 0.06]
        >>> sharpe = calculate_sharpe_ratio(returns)
        >>> print(f'夏普比率: {sharpe:.2f}')

    注意:
        - 收益率应为小数形式,不是百分比
        - 至少需要2个数据点
        - 如果收益标准差为0,返回None

    作者:
        Claude Code
        创建日期: 2024-01-15
    """
    import statistics

    # 验证输入
    if not returns or len(returns) < 2:
        raise ValueError('收益率序列至少需要2个数据点')

    # 计算平均收益
    avg_return = statistics.mean(returns)

    # 计算收益标准差
    std_return = statistics.stdev(returns)

    # 如果标准差为0,返回None(避免除零错误)
    if std_return == 0:
        return None

    # 计算夏普比率
    sharpe = (avg_return - risk_free_rate) / std_return

    return sharpe

# 使用示例
test_returns = [0.05, 0.03, 0.08, -0.02, 0.06]
try:
    sharpe_ratio = calculate_sharpe_ratio(test_returns)
    print(f'夏普比率: {sharpe_ratio:.2f}')
except ValueError as e:
    print(f'错误: {e}')
夏普比率: 0.26

函数设计最佳实践

  • 单一职责:每个函数只做一件事
  • 描述性命名:使用动词短语,如 calculate_sharpe_ratio()
  • 参数数量:不超过 5 个位置参数
  • 函数长度:理想不超过 20 行
  • 避免副作用:纯函数更易于测试和推理

总结

知识点 核心要点
函数定义 def + 函数名 + 参数 + return
参数类型 位置、关键字、默认、*args**kwargs
返回值 单返回值、多返回值(元组)、None
递归 基准情况 + 递归情况,注意深度限制
作用域 LEGB 规则,global/nonlocal
lambda 匿名函数,适合简短的一次性逻辑

⭐ 平台任务1:计算利率平均值

Listing 28
# ⚠️ 平台原始代码 - 请原样输入至教学平台(注释除外),平台才会判定答案正确
import pandas as pd  # 导入Pandas数据分析库

# 从Excel文件读取数据存入data
data = pd.read_excel(r"https://huoran.oss-cn-shenzhen.aliyuncs.com/20220811/xlsx/1557658167503511552.xlsx")

data.columns =["日期","Shibor","Libor","Hibor"]  # 定义列表data.columns

Shibor = []  # 定义列表Shibor

for i in data["Shibor"]:  # 遍历data["Shibor"]中的每个i

    Shibor.append(i)  # 将Shibor利率数据添加到列表

Libor = []  # 定义列表Libor

for i in data["Libor"]:  # 遍历data["Libor"]中的每个i

    Libor.append(i)  # 将美元Libor利率数据添加到列表

Hibor = []  # 定义列表Hibor

for i in data["Hibor"]:  # 遍历data["Hibor"]中的每个i
    
    Hibor.append(i)  # 将港元Hibor利率数据添加到列表

f_mean = lambda x: sum(x)/len(x) #用lambda函数定义并且参数x以列表的数据结构输入

Shibor_mean = f_mean(x=Shibor)  #计算Shibor的平均值

Libor_mean = f_mean(x=Libor)   #计算美元Libor的平均值

Hibor_mean = f_mean(x=Hibor)   #计算美元Hibor的平均值

print("3月11日到29日期间3个月期Shibor平均值",round(Shibor_mean,6))  # 输出3月11日到29日期间3个月期Shibor平均值

print("3月11日到29日期间3个月期美元Libor平均值",round(Libor_mean,6))  # 输出3月11日到29日期间3个月期美元Libor平均值

print("3月11日到29日期间3个月期Hibor平均值",round(Hibor_mean,6))  # 输出3月11日到29日期间3个月期Hibor平均值
3月11日到29日期间3个月期Shibor平均值 0.028065
3月11日到29日期间3个月期美元Libor平均值 0.026077
3月11日到29日期间3个月期Hibor平均值 0.017619

⭐ 平台任务2:计算利率波动率

Listing 29
# ⚠️ 平台原始代码 - 请原样输入至教学平台(注释除外),平台才会判定答案正确
import pandas as pd  # 导入Pandas数据分析库

# 从Excel文件读取数据存入data
data = pd.read_excel(r"https://huoran.oss-cn-shenzhen.aliyuncs.com/20220811/xlsx/1557658167503511552.xlsx")

data.columns =["日期","Shibor","Libor","Hibor"]  # 定义列表data.columns

Shibor = []  # 定义列表Shibor

for i in data["Shibor"]:  # 遍历data["Shibor"]中的每个i

    Shibor.append(i)  # 将Shibor利率数据添加到列表

Libor = []  # 定义列表Libor

for i in data["Libor"]:  # 遍历data["Libor"]中的每个i

    Libor.append(i)  # 将美元Libor利率数据添加到列表

Hibor = []  # 定义列表Hibor

for i in data["Hibor"]:  # 遍历data["Hibor"]中的每个i

    Hibor.append(i)  # 将港元Hibor利率数据添加到列表

def f_sigma(x):  # 定义函数f_sigma

  '''通过Python定义一个计算变量波动率的函数

  x:代表变量的样本值,可以用列表的数据结构输入'''

  n = len(x)  # 获取数据长度

  u_mean = sum(x)/n #计算变量样本值的均值

  z = []       #生成一个空列表

  for t in range(n):  # 遍历range(n)中的每个t

        z.append((x[t]-u_mean)**2)  # 将计算结果累加到z列表

  return (sum(z)/(n-1))**0.5    #计算波动率      

Shibor_sigma = f_sigma(x=Shibor) #计算Shibor波动率

Libor_sigma = f_sigma(x=Libor) #计算Libor波动率

Hibor_sigma = f_sigma(x=Hibor) #计算Hibor波动率

print("3月11日到29日期间3个月期Shibor波动率",round(Shibor_sigma,6))  # 输出3月11日到29日期间3个月期Shibor波动率

print("3月11日到29日期间3个月期没有Libor波动率",round(Libor_sigma,6))  # 输出3月11日到29日期间3个月期没有Libor波动率

print("3月11日到29日期间3个月期Hibor波动率",round(Hibor_sigma,6))  # 输出3月11日到29日期间3个月期Hibor波动率
3月11日到29日期间3个月期Shibor波动率 0.000286
3月11日到29日期间3个月期没有Libor波动率 0.000111
3月11日到29日期间3个月期Hibor波动率 0.000702

⭐ 平台任务3:利率比较

Listing 30
# ⚠️ 平台原始代码 - 请原样输入至教学平台(注释除外),平台才会判定答案正确
Shibor_mean = 0.028065  # 设置期数/数量为0

Libor_mean = 0.026077  # 设置期数/数量为0

Hibor_mean = 0.017619  # 设置期数/数量为0

Shibor_sigma = 0.000286  # 设置数组长度参数为0

Libor_sigma = 0.000111  # 设置数组长度参数为0

Hibor_sigma = 0.000702  # 设置数组长度参数为0

print(Shibor_mean > Libor_mean) #判断Shibor的平均值是否大于美元Libor的平均值

print(Libor_mean > Hibor_mean)  #判断美元Libor的平均值是否大于Hibor的平均值

print(Shibor_sigma > Libor_sigma) #判断Shibor的波动率是否大于Libor的波动率

print(Libor_sigma > Hibor_sigma) #判断美云Libor的波动率是否大于Hibor的波动率

print(Shibor_sigma > Hibor_sigma) #判断Shibor的波动率是否大于Hibor的波动率
True
True
True
False
False